package com.abcode.test.oracletest.common.utils;

import com.google.common.collect.Lists;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * @version: 1.0
 * @Author: yejing
 * @description: 批量操作数据库工具类
 * @date: 2023-03-20
 */
public class BatchOperateDbUtil {

    private static final Integer size = 1000;

    public static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(16, 32, 1000, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(20), new ThreadPoolExecutor.CallerRunsPolicy());

    /**
     * 批量查询
     *
     * @param fun
     * @param ids
     * @return R
     * @author yej
     * @date 2023/3/30
     */
    public static <T, R> List<R> batchQuery(Function<List<T>, List<R>> fun, List<T> ids) {
        List<CompletableFuture<List<R>>> completableFutures = Lists.newArrayList();

        // 默认1000为一批次
        int batchSize = 500;
        int size = ids.size();
        if (size <= batchSize) {
            completableFutures.add(CompletableFuture.supplyAsync(() -> fun.apply(ids), threadPoolExecutor));
        }

        int len = size / batchSize;
        for (int i = 0; i < len; i++) {
            List<T> splitIds = ids.subList(i * batchSize, (i + 1) * batchSize);
            completableFutures.add(CompletableFuture.supplyAsync(() -> fun.apply(splitIds), threadPoolExecutor));
        }

        if (size > len * batchSize && len > 0) {
            List<T> splitIds = ids.subList(len * batchSize, size);
            completableFutures.add(CompletableFuture.supplyAsync(() -> fun.apply(splitIds), threadPoolExecutor));
        }

        return CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[]{})).thenApply(v -> completableFutures.stream().map(CompletableFuture::join).
                collect(Collectors.toList())).thenApply(lists -> lists.stream().flatMap(Collection::stream).collect(Collectors.toList())).join();
    }

    /**
     * 批量执行
     *
     * @param consumer
     * @param ids
     * @author yej
     * @date 2023/3/30
     */
    public static <T> void batchExecute(Consumer<List<T>> consumer, List<T> ids, Integer count) {
        List<CompletableFuture> completableFutures = Lists.newArrayList();

        // 默认1000为一批次
        int batchSize = size;
        if(count != null){
            batchSize = count;
        }
        int size = ids.size();
        if (size <= batchSize) {
            completableFutures.add(CompletableFuture.runAsync(() -> consumer.accept(ids), threadPoolExecutor));
        }

        int len = size / batchSize;
        for (int i = 0; i < len; i++) {
            List<T> splitIds = ids.subList(i * batchSize, (i + 1) * batchSize);
            completableFutures.add(CompletableFuture.runAsync(() -> consumer.accept(splitIds), threadPoolExecutor));
        }

        // 最后包底
        if (size > len * batchSize && len > 0) {
            List<T> splitIds = ids.subList(len * batchSize, size);
            completableFutures.add(CompletableFuture.runAsync(() -> consumer.accept(splitIds), threadPoolExecutor));
        }

        CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[]{})).join();
    }
}
